package org.niit.service

import java.text.SimpleDateFormat

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.niit.common.TService
import org.niit.dao.BlackListDao
import org.niit.bean.AdClickData
import org.niit.util.MyKafkaUtil

class BlackListService extends TService{

  override def dataAnalysis(data: DStream[AdClickData]): Any = {


    val ds: DStream[((String, String, String), Int)] = filterKafkaData(data)
	//检查用户点击次数 -->
    checkUserCount(ds)



    }

  //利用在Kafka中采集出来的流式数据对数据库进行实时检索
  //将没有在黑名单的用户过滤出去
  private def filterKafkaData(adClickData: DStream[AdClickData]): DStream[((String, String, String), Int)] ={
   //1.将DStream中的AdClickData 进行转换操作-->RDD,再进行过滤和转换

    adClickData.transform(rdd=>{
      //2.判断点击用户是否在黑名单当中
      //Driver端
      //当用户不存在黑名单当中进行返回
      val filterRDD = rdd.filter(data=>{
        //Excetor
        val blackListDao = new BlackListDao//如果rdd操作中要执行外部参数，建议写rdd操作的里面
        !blackListDao.selectBlackUserById(data.user)
      })

      //如果用户不在黑名单当中，进行点击次数的统计（每个采集周期）
      val resData = filterRDD.map(data => {
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        //将时间戳 转换为日期
        val day = sdf.format(new java.util.Date(data.ts.toLong))
        val user = data.user
        val ad = data.ad
        //用户在某一天点击某个广告 1 次
        //(hello,1) (hello,1) (hello,1) ==> (hello,3)
        // （（2023-4-3 张三 加多宝） 1 ） （（2023-4-3 张三 加多宝） 1 ） （（2023-4-3 张三 加多宝） 1 ） （（2023-4-3 张三 京东） 1 ）
        ((day, user, ad), 1) // (word  ,  count)
        //    key         value
      })
      // // （（2023-4-3 张三 加多宝） 1 ） （（2023-4-3 张三 加多宝） 1 ） （（2023-4-3 张三 加多宝） 1 ） ==》（（2023-4-3 张三 加多宝） 3 ）
      val reData = resData.reduceByKey(_ + _)
     // （（2023-4-3 张三 加多宝） 3 ）  将合并后的数据进行返回
      reData

    })

  }

  //检查用户的点击次数，如果达到规定次数则拉黑，否则存入到点击次数表中，存入表后，发现点击次数超过规定次数也会拉入黑名单当中
  private def checkUserCount(ds:DStream[((String, String, String), Int)]): Unit ={
    //SparkStreaming输出 print foreachRdd saveAs......
    ds.foreachRDD( rdd =>{
      rdd.foreach{
        case ((day,user,ad),count)=>{
            println(s"${day},${user},${ad},${count}")
            //如果点击次数超过阈值（30）,那么将用户拉入黑名单
            val blackListDao = new BlackListDao
            if(count >= 30){
              blackListDao.insertBlackList(user)
            }else{
              //如果没有超过阈值，那么就将数据存入到点击次数表中
              blackListDao.insertUserAdCount(day,user,ad,count)
            }
          //如果存入点击次数表中，发现超过了规定阈值，也会拉黑
          val bool = blackListDao.checkUserByCount(day, user, ad, 30)
          if(bool){
            blackListDao.insertBlackList(user)
          }
        }
      }

    })
  }

}
